package com.gitee.charlesvhe.nifi.processors;

import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.postgresql.PostgresConnector;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;

@TriggerSerially
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"CDC", "Debezium", "MySQL", "PostgreSQL", "Oracle", "SQL Server", "Db2", "Cassandra", "MongoDB"})
@CapabilityDescription("Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. " +
        "Debezium records all row-level changes within each database table in a change event stream, " +
        "and applications simply read these streams to see the change events in the same order in which they occurred.")
@Stateful(scopes = Scope.CLUSTER, description = "Information such as a 'pointer' to the current CDC event in the database is stored by this processor, " +
        "such that it can continue from the same location if restarted.")
@DynamicProperty(name = "Engine Properties",
        value = "Attribute Expression Language",
        description = "https://debezium.io/documentation/reference/1.2/development/engine.html")
@DefaultSchedule(period = "10 sec")
public class ChangeDataCaptureProcessor extends AbstractProcessor {
    public static final PropertyDescriptor BACKPRESSURE_PERIOD = new PropertyDescriptor.Builder()
            .name("backpressure_period")
            .description("backpressure check period (1.5x of 'Run Schedule' value). CDC engine will stop if onTrigger not called (backpressure happened).")
            .required(true)
            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
            .defaultValue("15 sec")
            .build();

    public static final String CASSANDRA_CONNECTOR_KEY = "io.debezium.connector.cassandra.CassandraConnector";
    public static final PropertyDescriptor CONNECTOR_CLASS = new PropertyDescriptor.Builder()
            .name("connector.class")
            .required(true)
            .allowableValues(
                    MySqlConnector.class.getName(),
                    PostgresConnector.class.getName(),
//                    OracleConnector.class.getName(),
//                    Db2Connector.class.getName(),
//                    MongoDbConnector.class.getName(),
                    CASSANDRA_CONNECTOR_KEY
            )
            .build();

    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").build();

    public static final List<PropertyDescriptor> LIST_PROPERTY = Arrays.asList(BACKPRESSURE_PERIOD, CONNECTOR_CLASS);
    public static final Set<Relationship> SET_RELATIONSHIP = new HashSet<>(Collections.singletonList(REL_SUCCESS));


    private Thread debeziumEngineThread;
    private DebeziumEngine<ChangeEvent<String, String>> debeziumEngine;
    private Long expireTimestamp;
    private ProcessSession currentSession;

    @Override
    public void onTrigger(final ProcessContext processContext, final ProcessSession processSession) throws ProcessException {
        expireTimestamp = System.currentTimeMillis() + processContext.getProperty(BACKPRESSURE_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
        currentSession = processSession;

        // init
        if (debeziumEngine == null) {
            debeziumEngine = DebeziumEngine.create(Json.class)
                    .using(buildEngineProperties(processContext))
                    .using((success, message, error) -> {
                        if (error != null) {
                            getLogger().info(error.getMessage());
                            this.onUnscheduled();
                        }
                    })
                    .notifying((records, committer) -> {
                        if (System.currentTimeMillis() > expireTimestamp) {
                            throw new RuntimeException("backpressure happened. expireTimestamp: " + expireTimestamp);
                        }
                        ProcessSession session = getCurrentSession();
                        for (ChangeEvent<String, String> record : records) {
                            FlowFile flowFile = getCurrentSession().create();
                            flowFile = session.write(flowFile, outputStream -> {
                                outputStream.write(record.value().getBytes());
                                outputStream.flush();
                            });
                            session.transfer(flowFile, REL_SUCCESS);
                            session.getProvenanceReporter().receive(flowFile, "<unknown>");
                            committer.markProcessed(record);
                        }
                        session.commit();
                        committer.markBatchFinished();
                    }).build();

            debeziumEngineThread = new Thread(debeziumEngine);
            debeziumEngineThread.start();
        }
    }

    public ProcessSession getCurrentSession() {
        return currentSession;
    }

    private Properties buildEngineProperties(ProcessContext context) {
        Properties properties = new Properties();
        context.getAllProperties().forEach((key, value) -> {
            if (key != null && value != null) {
                properties.put(key, value);
            }
        });

        // default global config
        properties.putIfAbsent("name", this.getIdentifier());
        // TODO use nifi persistent storage
        properties.putIfAbsent("offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore");

        if (PostgresConnector.class.getName().equals(properties.getProperty(CONNECTOR_CLASS.getName()))) {
            // default PostgresConnector config
            properties.putIfAbsent("plugin.name", "pgoutput");

            String dbname = properties.getProperty("database.dbname");
            properties.putIfAbsent("database.server.name", dbname);
            properties.putIfAbsent("slot.name", "debezium_" + dbname);
            properties.putIfAbsent("publication.name", "dbz_publication_" + dbname);
        } else if (MySqlConnector.class.getName().equals(properties.getProperty(CONNECTOR_CLASS.getName()))) {
            // default MySqlConnector config
        }
        return properties;
    }

    @OnUnscheduled
    public void onUnscheduled() {
        try {
            debeziumEngine.close();
        } catch (IOException e) {
            getLogger().info(e.getMessage());
        } finally {
            debeziumEngine = null;
        }
    }

    @Override
    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        return new PropertyDescriptor.Builder()
                .name(propertyDescriptorName)
                .required(false)
                .dynamic(true)
                .sensitive(propertyDescriptorName.contains("password"))
                .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
                .build();
    }

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return LIST_PROPERTY;
    }

    @Override
    public Set<Relationship> getRelationships() {
        return SET_RELATIONSHIP;
    }
}
